﻿using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Naruto.Redis;
using Naruto.Subscribe.Extension;
using Naruto.Subscribe.Interface;
using Naruto.Subscribe.Object;
using Naruto.Subscribe.Provider.Redis.Object;
using Newtonsoft.Json;
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Naruto.Subscribe.Provider.Redis.Internal
{
    /// <summary>
    /// redis订阅的实现
    /// </summary>
    public class RedisConsumerClient : IConsumerClient
    {
        /// <summary>
        /// redis仓储
        /// </summary>
        private readonly IRedisRepository redis;

        private readonly ILogger logger;
        /// <summary>
        /// 
        /// </summary>
        private readonly MessageQueueOption messageQueueOption;
        /// <summary>
        /// 消息接收
        /// </summary>

        public event EventHandler<MessageModel> OnMessageReceived;

        /// <summary>
        /// 
        /// </summary>
        private List<string> subscribeNames;
        /// <summary>
        /// 
        /// </summary>
        /// <param name="_redis"></param>
        /// <param name="_logger"></param>
        /// <param name="_subscribeHandler"></param>
        /// <param name="_messageQueueOption"></param>
        public RedisConsumerClient(IRedisRepository _redis, ILogger _logger, MessageQueueOption _messageQueueOption)
        {
            redis = _redis;
            logger = _logger;
            messageQueueOption = _messageQueueOption;
        }

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool isDispose)
        {
            if (isDispose)
            {
            }
        }
        /// <summary>
        /// 处理订阅信息
        /// </summary>
        /// <param name="subscribeName"></param>
        /// <returns></returns>
        public async Task SubscribeAsync(List<string> _subscribeNames, CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            subscribeNames = _subscribeNames;
            _subscribeNames.CheckNull();
            foreach (var subscribeName in _subscribeNames)
            {
                //创建消费者关联信息
                await redis.Stream.CreateConsumerGroupAsync(subscribeName, messageQueueOption.ConsumerGroup, cancellationToken);
            }
        }
        /// <summary>
        /// 监听
        /// </summary>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        public async Task ListingAsync(CancellationToken cancellationToken = default)
        {
            cancellationToken.ThrowIfCancellationRequested();
            //优先处理待处理的队列
            var pengdingStreamPosition = subscribeNames.Select(a => new StreamPosition(a, StreamPosition.Beginning)).ToArray();
            var pengdingRedisStream = PollPengdingMessageAsync(pengdingStreamPosition);
            await ConsumerAsync(pengdingRedisStream, cancellationToken);
            //新消息
            var newStreamPosition = subscribeNames.Select(a => new StreamPosition(a, StreamPosition.NewMessages)).ToArray();
            var newRedisStream = PollNewMessageAsync(newStreamPosition);

            await ConsumerAsync(newRedisStream, cancellationToken);
        }
        /// <summary>
        /// 读取 因异常未处理的消息
        /// </summary>
        /// <param name="subscribeNames"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        protected virtual async IAsyncEnumerable<RedisStream[]> PollPengdingMessageAsync(StreamPosition[] streamPosition)
        {
            while (true)
            {
                var streamList = await redis.Stream.StreamReadGroupAsync(streamPosition, messageQueueOption.ConsumerGroup, messageQueueOption.ConsumerGroup, messageQueueOption.CountPerStream);
                //当消息数没有的之后就不再读取
                if (streamList == null || streamList.Count() <= 0 || streamList.All(a => a.Entries.Length <= 0))
                    break;
                yield return streamList;
            }
        }

        /// <summary>
        /// 读取 新消息
        /// </summary>
        /// <param name="subscribeNames"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        protected virtual async IAsyncEnumerable<RedisStream[]> PollNewMessageAsync(StreamPosition[] streamPosition)
        {
            while (true)
            {
                await Task.Delay(10);
                var streamList = await redis.Stream.StreamReadGroupAsync(streamPosition, messageQueueOption.ConsumerGroup, messageQueueOption.ConsumerGroup, messageQueueOption.CountPerStream);
                yield return streamList;
            }
        }
        /// <summary>
        /// 消费数据
        /// </summary>
        /// <param name="consumerMessages"></param>
        /// <param name="cancellationToken"></param>
        /// <returns></returns>
        protected virtual async Task ConsumerAsync(IAsyncEnumerable<RedisStream[]> consumerMessages, CancellationToken cancellationToken)
        {
            cancellationToken.ThrowIfCancellationRequested();
            await foreach (var itemRedisStreams in consumerMessages)
                foreach (var redisStream in itemRedisStreams)
                    foreach (var itemEntrie in redisStream.Entries)
                    {
                        if (itemEntrie.IsNull)
                            continue;
                        OnMessageReceived?.Invoke((redisStream.Key.ToString(), itemEntrie.Id.ToString()), Encoding.UTF8.GetString(itemEntrie["body"]).ToDeserialized<MessageModel>());
                    }
        }

        /// <summary>
        /// 提交
        /// </summary>
        /// <param name="sender"></param>
        /// <returns></returns>
        public async Task CommitAsync(object sender)
        {
            var source = ((string subscribeName, string messageId))sender;
            //消息确认
            await redis.Stream.AckAsync(source.subscribeName, messageQueueOption.ConsumerGroup, source.messageId);
        }
        /// <summary>
        /// 消息重置
        /// </summary>
        /// <param name="sender"></param>
        /// <returns></returns>
        public Task ResetAsync(object sender)
        {

            return Task.CompletedTask;
        }
    }
}
